package org.budo.warehouse.logic.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.budo.support.lang.util.MapUtil;
import org.budo.support.lang.util.StringUtil;
import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.DataEntryPojo;
import org.budo.warehouse.logic.api.DataEntryPojo.Row;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.api.DataMessagePojo;
import org.budo.warehouse.service.entity.Pipeline;

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;

/**
 * @author limingwei
 */
@Slf4j
public class DataMessageUtil {
    public static List<String> tables(DataMessage dataMessage) {
        List<String> list = new ArrayList<String>();

        List<DataEntry> dataEntries = dataMessage.getDataEntries();
        for (DataEntry dataEntry : dataEntries) {
            String schemaName = dataEntry.getSchemaName();
            String tableName = dataEntry.getTableName();

            list.add(schemaName + "." + tableName);
        }

        return list;
    }

    public static DataMessagePojo toMessagePojo(DataMessage dataMessage, Pipeline pipeline) {
        List<DataEntry> dataEntries = dataMessage.getDataEntries();
        List<DataEntry> dataEntryPojos = new ArrayList<DataEntry>();

        for (DataEntry dataEntry : dataEntries) {
            String targetSchema = PipelineUtil.targetSchema(pipeline, dataEntry);
            String targetTable = PipelineUtil.targetTable(pipeline, dataEntry);

            DataEntryPojo dataEntryPojo = new DataEntryPojo(dataEntry) // 序列化传输
                    .setSchemaName(targetSchema) // 供下一步匹配
                    .setTableName(targetTable);

            dataEntryPojos.add(dataEntryPojo);
        }

        Integer dataNodeId = pipeline.getTargetDataNodeId();
        return new DataMessagePojo(dataMessage.getId(), dataNodeId, dataEntryPojos);
    }

    public static DataMessage toMessageBuffer(DataMessage dataMessage) {
        List<DataEntry> dataEntries = dataMessage.getDataEntries();
        List<DataEntry> dataEntryPojos = new ArrayList<DataEntry>();
        for (DataEntry dataEntry : dataEntries) {
            String tableName = dataEntry.getTableName();
            String eventType = dataEntry.getEventType();

            if (StringUtil.equals(tableName, "t_entry_buffer") && StringUtil.equals(eventType, "INSERT")) {
                log.debug("#230 dataEntry=" + dataEntry); // 避免造成递归
                continue;
            }

            dataEntryPojos.add(new DataEntryPojo(dataEntry));
        }

        Integer dataNodeId = dataMessage.getDataNodeId();
        return new DataMessagePojo(dataMessage.getId(), dataNodeId, dataEntryPojos);
    }

    public static String toSimpleString(DataMessage dataMessage) {
        if (null == dataMessage) {
            return null;
        }

        List<Map<String, Object>> dataEntries = _dataEntry(dataMessage.getDataEntries());

        Map<String, Object> dataMessageMap = MapUtil.stringObjectLinkedHashMap("id", dataMessage.getId(), //
                "dataNodeId", dataMessage.getDataNodeId(), //
                "dataEntries", dataEntries);
        return JSON.toJSONString(dataMessageMap);
    }

    private static List<Map<String, Object>> _dataEntry(List<DataEntry> dataEntries) {
        if (null == dataEntries) {
            return null;
        }

        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
        for (DataEntry dataEntry : dataEntries) {
            List<Row> rows = DataEntryUtil.toRows(dataEntry);
            List<Map<String, Object>> rowMaps = DataEntryUtil.rowsToSimplifyMaps(rows);

            Map<String, Object> dataEntryMap = MapUtil.stringObjectLinkedHashMap("eventType", dataEntry.getEventType() //
                    , "schemaName", dataEntry.getSchemaName() //
                    , "tableName", dataEntry.getTableName()//
                    , "rows", rowMaps);

            String sql = dataEntry.getSql();
            if (!StringUtil.isEmpty(sql)) {
                dataEntryMap.put("sql", sql);
            }

            list.add(dataEntryMap);
        }

        return list;
    }
}